import asyncio
from pathlib import Path
import shutil
import tempfile
from typing import AsyncGenerator, List, Sequence, Optional
import re
from typing import Any, Mapping
import uuid
from loguru import logger
from datetime import datetime
from pydantic import Field
from autogen_core import CancellationToken, ComponentModel, Component
from autogen_core.models import (
    ChatCompletionClient,
    UserMessage,
    SystemMessage,
)
from pydantic import BaseModel
from typing_extensions import Self

from autogen_agentchat.agents import BaseChatAgent
from autogen_core.code_executor import CodeBlock, CodeExecutor
from autogen_core.model_context import (
    ChatCompletionContext,
    TokenLimitedChatCompletionContext,
)
from autogen_agentchat.base import Response
from autogen_agentchat.state import BaseState
from autogen_agentchat.messages import (
    BaseAgentEvent,
    BaseChatMessage,
    TextMessage,
    MessageFactory,
)
from autogen_core.code_executor import CodeResult
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor

from ..utils import thread_to_context
from ._utils import exec_command_umask_patched

from ..approval_guard import BaseApprovalGuard
from ..guarded_action import ApprovalDeniedError, TrivialGuardedAction

DockerCommandLineCodeExecutor._execute_command = exec_command_umask_patched  # type: ignore


def _extract_markdown_code_blocks(markdown_text: str) -> List[CodeBlock]:
    pattern = re.compile(r"```(?:\s*([\w\+\-]+))?\n([\s\S]*?)```")
    matches = pattern.findall(markdown_text)
    code_blocks: List[CodeBlock] = []
    for match in matches:
        language = match[0].strip() if match[0] else ""
        code_content = match[1]
        code_blocks.append(CodeBlock(code=code_content, language=language))
    return code_blocks


async def _invoke_action_guard(
    thread: Sequence[BaseChatMessage | BaseAgentEvent],
    delta: Sequence[BaseChatMessage | BaseAgentEvent],
    code_message: TextMessage,
    agent_name: str,
    model_client: ChatCompletionClient,
    approval_guard: BaseApprovalGuard | None,
) -> None:
    # Get approval for the coding request. We could conceivably do extra work to enable interactive approval here,
    # but the value for many users is likely to be low, as it may not be appropriate to assume knowledge of coding,
    # and thus the user will not have the context necessary to approve/deny the incremental execution of code blocks.
    guarded_action = TrivialGuardedAction("coding", baseline_override="maybe")

    # Note that delta already contains the code message.
    assert delta[-1] == code_message

    thread = list(thread) + list(delta)

    context = thread_to_context(
        thread,
        agent_name,
        is_multimodal=model_client.model_info["vision"],
    )
    action_description_for_user = TextMessage(
        content="Do you want to execute the code above?",
        source=agent_name,
    )

    await guarded_action.invoke_with_approval(
        {}, code_message, context, approval_guard, action_description_for_user
    )


async def _coding_and_debug(
    system_prompt: str,
    thread: Sequence[BaseChatMessage],
    agent_name: str,
    model_client: ChatCompletionClient,
    code_executor: CodeExecutor,
    max_debug_rounds: int,
    cancellation_token: CancellationToken,
    model_context: ChatCompletionContext,
    approval_guard: BaseApprovalGuard | None,
) -> AsyncGenerator[TextMessage | bool, None]:
    """Write and debug code using the model and executor.

    It generates code based on the system prompt and the thread of messages,
    executes it, and returns the output. It continues to generate and execute
    code until the maximum number of debug rounds is reached or the code
    executes successfully.

    When the cancellation token is set, the execution will stop.
    Args:
        system_prompt (str): The system prompt to guide the model.
        thread (Sequence[BaseChatMessage]): The thread of messages to use as context.
        agent_name (str): The name of the agent.
        model_client (ChatCompletionClient): The model client to use for code generation.
        code_executor (CodeExecutor): The code executor to use for executing the code.
        max_debug_rounds (int): The maximum number of debug rounds to perform.
        cancellation_token (CancellationToken): The cancellation token to stop execution.
        model_context (ChatCompletionContext): The context for the model.
        approval_guard (ApprovalGuard | None): The approval guard to use for code execution.

    Yields:
        TextMessage: The intermediate messages generated by the model and executor.
        bool: A flag indicating whether any code execution was performed.

    Raises:
        ApprovalDeniedError: If the user denies the approval for the coding request.
    """
    # The list of new messages to be added to the thread.
    delta: Sequence[BaseChatMessage | BaseAgentEvent] = []
    executed_code = False

    for i in range(max_debug_rounds):
        # Add system prompt as the last message before generation
        current_thread = (
            list(thread)
            + list(delta)
            + [TextMessage(source="user", content=system_prompt)]
        )

        # create an LLM context from system message, global chat history, and inner messages
        context = [SystemMessage(content=system_prompt)] + thread_to_context(
            current_thread,
            agent_name,
            is_multimodal=model_client.model_info["vision"],
        )

        # Re-initialize model context to meet token limit quota
        try:
            await model_context.clear()
            for msg in context:
                await model_context.add_message(msg)
            token_limited_context = await model_context.get_messages()
        except Exception:
            token_limited_context = context
        # Generate code using the model.
        create_result = await model_client.create(
            messages=token_limited_context, cancellation_token=cancellation_token
        )
        assert isinstance(create_result.content, str)
        code_msg = TextMessage(
            source=agent_name + "-llm",
            metadata={"internal": "no", "type": "potential_code"},
            content=create_result.content,
        )
        # add LLM's response to the current thread.
        delta.append(code_msg)
        yield code_msg

        # extract code blocks from the LLM's response
        code_block_list = _extract_markdown_code_blocks(create_result.content)
        # if no code to execute, return
        if len(code_block_list) == 0:
            break

        # Now that we know we have code to execute, make sure we have permission to do so.
        # Note that we will be getting this permission per round of code/execute/debug.
        if approval_guard is not None:
            await _invoke_action_guard(
                thread=thread,
                delta=delta,
                code_message=code_msg,  # note that this `code_msg is delta[-1]`
                agent_name=agent_name,
                model_client=model_client,
                approval_guard=approval_guard,
            )

        code_output_list: List[str] = []
        exit_code_list: List[int] = []
        executed_code = True
        try:
            for cb in code_block_list:
                # execute the code block
                exit_code: int = 1
                encountered_exception: bool = False
                code_output: str = ""
                result: CodeResult | None = None
                try:
                    result = await code_executor.execute_code_blocks(
                        [cb], cancellation_token
                    )
                    exit_code = result.exit_code or 0
                    code_output = result.output
                except Exception as e:
                    code_output = str(e)
                    encountered_exception = True
                if encountered_exception or result is None:
                    code_output = f"An exception occurred while executing the code block: {code_output}"
                elif code_output.strip() == "":
                    # No output
                    code_output = f"The script ran but produced no output to console. The POSIX exit code was: {result.exit_code}. If you were expecting output, consider revising the script to ensure content is printed to stdout."
                elif exit_code != 0:
                    # Error
                    code_output = f"The script ran, then exited with an error (POSIX exit code: {result.exit_code})\nIts output was:\n{result.output}"
                code_output_list.append(code_output)
                code_output_msg = TextMessage(
                    source=agent_name + "-executor",
                    metadata={"internal": "no", "type": "code_execution"},
                    content=f"Execution result of code block {i + 1}:\n```console\n{code_output}\n```",
                )
                exit_code_list.append(exit_code)
                yield code_output_msg

            final_code_output = ""
            for i, code_output in enumerate(code_output_list):
                final_code_output += f"\n\nExecution Result of Code Block {i + 1}:\n```console\n{code_output}\n```"

            # add executors response to thread.
            executor_msg = TextMessage(
                source=agent_name + "-executor",
                metadata={"internal": "yes"},
                content=final_code_output,
            )
            delta.append(executor_msg)
            yield executor_msg

            # break if the code execution was successful
            if all([code_output == 0 for code_output in exit_code_list]):
                break
        except asyncio.TimeoutError:
            # If the task times out, we treat it as an error.
            executor_msg = TextMessage(
                source=agent_name + "-executor",
                metadata={"internal": "yes"},
                content="Code execution timed out.",
            )
            delta.append(executor_msg)
            yield executor_msg

    yield executed_code


async def _summarize_coding(
    agent_name: str,
    model_client: ChatCompletionClient,
    thread: Sequence[BaseChatMessage | BaseAgentEvent],
    cancellation_token: CancellationToken,
    model_context: ChatCompletionContext,
) -> TextMessage:
    # Create a summary from the inner messages using an extra LLM call.
    input_messages = (
        [SystemMessage(content="You are an agent that can write and debug code")]
        + thread_to_context(
            list(thread), agent_name, is_multimodal=model_client.model_info["vision"]
        )
        + [
            UserMessage(
                content="""
                The above is a transcript of your previous messages and a request that was given to you in the begining.
                You need to summarize them to answer the request given to you. Generate a summary of everything that happened.
                If there was code that was executed, please copy the final code that was executed without errors.
                Don't mention that this is a summary, just give the summary.""",
                source="user",
            )
        ]
    )

    # Re-initialize model context to meet token limit quota
    try:
        await model_context.clear()
        for msg in input_messages:
            await model_context.add_message(msg)
        token_limited_input_messages = await model_context.get_messages()
    except Exception:
        token_limited_input_messages = input_messages

    summary_result = await model_client.create(
        messages=token_limited_input_messages, cancellation_token=cancellation_token
    )
    assert isinstance(summary_result.content, str)
    return TextMessage(
        source=agent_name,
        metadata={"internal": "yes"},
        content=summary_result.content,
    )


class CoderAgentConfig(BaseModel):
    name: str
    model_client: ComponentModel
    description: str = """
    An agent that can write and execute code to solve tasks or use its language skills to summarize, write, solve math and logic problems.
    It understands images and can use them to help it complete the task.
    It can access files if given the path and manipulate them using python code. Use the coder if you want to manipulate a file or read a csv or excel files.
    """
    max_debug_rounds: int = 3
    summarize_output: bool = False
    # Optionally add code_executor config if needed


class CoderAgentState(BaseState):
    chat_history: List[BaseChatMessage] = Field(default_factory=list[BaseChatMessage])
    type: str = Field(default="CoderAgentState")


class CoderAgent(BaseChatAgent, Component[CoderAgentConfig]):
    """An agent capable of writing, executing, and debugging code.

    The agent uses either a local or Docker-based code executor to run the generated code
    in a controlled environment. It maintains a chat history and can be paused/resumed
    during execution.
    """

    component_type = "agent"
    component_config_schema = CoderAgentConfig
    component_provider_override = "magentic_ui.agents.CoderAgent"

    DEFAULT_DESCRIPTION = """
    An agent that can write and execute code to solve tasks or use its language skills to summarize, write, solve math and logic problems.
    It understands images and can use them to help it complete the task.
    It can access files if given the path and manipulate them using python code. Use the coder if you want to manipulate a file or read a csv or excel files.
    In a single step when you ask the agent to do something: it can write code, and then immediately execute the code. If there are errors it can debug the code and try again. 
    """

    system_prompt_coder_template = """
    You are helpful assistant.
    In addition to responding with text you can write code and execute code that you generate.
    The date today is: {date_today}

    Rules to follow for Code:
    - Generate py or sh code blocks in the order you'd like your code to be executed.
    - Code block must indicate language type. Do not try to predict the answer of execution. Code blocks will be automatically executed for you.
    - If you want to stop executing code, make sure to not write any code in your message and your turn will be over.
    - Do not generate code that relies on API keys that you don't have access to. Try different approaches.

    Tips:
    - You don't have to generate code if the task is not related to code, for instance writing a poem, paraphrasing a text, etc.
    - If you are asked to solve math or logical problems, first try to answer them without code and then if needed try to use python to solve them.
    - You have access to the standard Python libraries in addition to numpy, pandas, scikit-learn, matplotlib, pillow, requests, beautifulsoup4.
    - If you need to use an external library, write first a shell script that installs the library first using pip install, then add code blocks to use the library.
    - Always use print statements to output your work and partial results.
    - For showing plots or other visualizations that are not just text, make sure to save them to file with the right extension for them to be displayed.

   VERY IMPORTANT: If you intend to write code to be executed, do not end your response without a code block. If you want to write code you must provide a code block in the current generation.
    """

    def __init__(
        self,
        name: str,
        model_client: ChatCompletionClient,
        model_context_token_limit: int = 128000,
        description: str = DEFAULT_DESCRIPTION,
        max_debug_rounds: int = 3,
        summarize_output: bool = False,
        code_executor: Optional[CodeExecutor] = None,
        work_dir: Path | str | None = None,
        bind_dir: Path | str | None = None,
        use_local_executor: bool = False,
        approval_guard: BaseApprovalGuard | None = None,
    ) -> None:
        """Initialize the CoderAgent.

        Args:
            name (str): The name of the agent
            model_client (ChatCompletionClient): The language model client to use
            description (str, optional): Description of the agent's capabilities. Default: DEFAULT_DESCRIPTION.
            max_debug_rounds (int, optional): Maximum number of code debugging iterations. Default: 3.
            summarize_output (bool, optional): Whether to summarize code execution results. Default: False.
            code_executor (Optional[CodeExecutor], optional): Custom code executor to use. Default: None.
            work_dir (Path | str | None, optional): Working directory for code execution. Default: None.
            bind_dir (Path | str | None, optional): Directory to bind for Docker executor. Default: None.
            use_local_executor (bool, optional): Whether to use local instead of Docker executor. Default: False.
        """
        super().__init__(name, description)
        self._model_client = model_client
        self._model_context = TokenLimitedChatCompletionContext(
            model_client, token_limit=model_context_token_limit
        )
        self._chat_history: List[BaseChatMessage] = []
        self._max_debug_rounds = max_debug_rounds
        self._summarize_output = summarize_output
        self.is_paused = False
        self._paused = asyncio.Event()
        self._approval_guard = approval_guard

        if work_dir is None:
            self._work_dir = Path(tempfile.mkdtemp())
            self._cleanup_work_dir = True
        else:
            self._work_dir = Path(work_dir)
            self._cleanup_work_dir = False
        if code_executor:
            self._code_executor = code_executor
        elif use_local_executor:
            self._code_executor = LocalCommandLineCodeExecutor(work_dir=self._work_dir)
        else:
            name = f"{name}-{uuid.uuid4()}"
            self._code_executor = DockerCommandLineCodeExecutor(
                container_name=name,
                image="magentic-ui-python-env",
                work_dir=self._work_dir,
                bind_dir=bind_dir,
                delete_tmp_files=True,
            )

    async def lazy_init(self) -> None:
        """Initialize the code executor if it has a start method.

        This method is called after initialization to set up any async resources
        needed by the code executor.
        """
        if self._code_executor:
            # check if the code executor has a start method
            if hasattr(self._code_executor, "start"):
                # TODO: we should add a no-op start() method to the base class.
                await self._code_executor.start()  # type: ignore

    async def close(self) -> None:
        """Clean up resources used by the agent.

        This method:
        - Stops the code executor
        - Removes the work directory if it was created
        - Closes the model client
        """
        logger.info("Closing Coder...")
        await self._code_executor.stop()
        # Remove the work directory if it was created.
        if self._cleanup_work_dir and self._work_dir.exists():
            await asyncio.to_thread(shutil.rmtree, self._work_dir)
        # Close the model client.
        await self._model_client.close()

    async def pause(self) -> None:
        """Pause the agent by setting the paused state."""
        self.is_paused = True
        self._paused.set()

    async def resume(self) -> None:
        """Resume the agent by clearing the paused state."""
        self.is_paused = False
        self._paused.clear()

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
        """Get the types of messages produced by the agent."""
        return (TextMessage,)

    async def on_messages(
        self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
    ) -> Response:
        """Handle incoming messages and return a single response. Calls the on_messages_stream."""
        response: Response | None = None
        async for message in self.on_messages_stream(messages, cancellation_token):
            if isinstance(message, Response):
                response = message
        assert response is not None
        return response

    async def on_messages_stream(
        self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
        """Handle incoming messages and yield responses as a stream. Append the request to agents chat history."""
        if self.is_paused:
            yield Response(
                chat_message=TextMessage(
                    content="The Coder is paused.",
                    source=self.name,
                    metadata={"internal": "yes"},
                )
            )
            return
        self._chat_history.extend(messages)
        last_message_received: BaseChatMessage = messages[-1]
        inner_messages: List[BaseChatMessage] = []

        # Set up the cancellation token for the code execution.
        code_execution_token = CancellationToken()

        # Cancel the code execution if the handler's cancellation token is set.
        cancellation_token.add_callback(lambda: code_execution_token.cancel())

        # Set up background task to monitor the pause event and cancel the code execution if paused.
        async def monitor_pause() -> None:
            await self._paused.wait()
            code_execution_token.cancel()

        monitor_pause_task = asyncio.create_task(monitor_pause())

        system_prompt_coder = self.system_prompt_coder_template.format(
            date_today=datetime.now().strftime("%Y-%m-%d")
        )

        try:
            executed_code = False
            # Run the code execution and debugging process.
            async for msg in _coding_and_debug(
                system_prompt=system_prompt_coder,
                thread=self._chat_history,
                agent_name=self.name,
                model_client=self._model_client,
                code_executor=self._code_executor,
                max_debug_rounds=self._max_debug_rounds,
                cancellation_token=code_execution_token,
                model_context=self._model_context,
                approval_guard=self._approval_guard,
            ):
                if isinstance(msg, bool):
                    executed_code = msg
                    break
                inner_messages.append(msg)
                self._chat_history.append(msg)
                yield msg

            # New conditional block based on the configuration flag.
            if self._summarize_output and executed_code:
                summary_msg = await _summarize_coding(
                    agent_name=self.name,
                    model_client=self._model_client,
                    thread=[last_message_received] + inner_messages,
                    cancellation_token=code_execution_token,
                    model_context=self._model_context,
                )
                self._chat_history.append(summary_msg)
                yield Response(chat_message=summary_msg, inner_messages=inner_messages)
            else:
                # Instead of only executor output, return a transcript of all code and execution steps.
                combined_output = ""
                for txt_msg in inner_messages:
                    assert isinstance(txt_msg, TextMessage)
                    combined_output += f"{txt_msg.content}\n"
                final_response_msg = TextMessage(
                    source=self.name,
                    metadata={"internal": "yes"},
                    content=combined_output or "No output.",
                )
                # TODO: do we not want to add this to the chat history?
                yield Response(
                    chat_message=final_response_msg, inner_messages=inner_messages
                )
        except ApprovalDeniedError:
            # If the user denies the approval, we respond with a message.
            yield Response(
                chat_message=TextMessage(
                    content="The user did not approve the code execution.",
                    source=self.name,
                    metadata={"internal": "no"},
                ),
                inner_messages=inner_messages,
            )
        except asyncio.CancelledError:
            # If the task is cancelled, we respond with a message.
            yield Response(
                chat_message=TextMessage(
                    content="The task was cancelled by the user.",
                    source=self.name,
                    metadata={"internal": "yes"},
                ),
                inner_messages=inner_messages,
            )
        except Exception as e:
            logger.error(f"Error in CoderAgent: {e}")
            # add to chat history
            self._chat_history.append(
                TextMessage(
                    content=f"An error occurred while executing the code: {e}",
                    source=self.name,
                )
            )
            yield Response(
                chat_message=TextMessage(
                    content=f"An error occurred in the coder agent: {e}",
                    source=self.name,
                    metadata={"internal": "no"},
                ),
                inner_messages=inner_messages,
            )
        finally:
            # Cancel the monitor task.
            try:
                monitor_pause_task.cancel()
                await monitor_pause_task
            except asyncio.CancelledError:
                pass

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        """Clear the chat history."""
        self._chat_history.clear()

    def _to_config(self) -> CoderAgentConfig:
        """Convert the agent's state to a configuration object."""
        return CoderAgentConfig(
            name=self.name,
            model_client=self._model_client.dump_component(),
            description=self.description,
            max_debug_rounds=self._max_debug_rounds,
            summarize_output=self._summarize_output,
            # TODO: Optionally add code_executor configuration if supported
        )

    @classmethod
    def _from_config(cls, config: CoderAgentConfig) -> Self:
        """Create an agent instance from a configuration object."""
        return cls(
            name=config.name,
            model_client=ChatCompletionClient.load_component(config.model_client),
            description=config.description,
            max_debug_rounds=config.max_debug_rounds,
            summarize_output=config.summarize_output,
            # TODO: Optionally load code_executor from config if provided
        )

    async def save_state(self) -> Mapping[str, Any]:
        """
        Save the state of the agent.
        """
        return {
            "chat_history": [msg.dump() for msg in self._chat_history],
        }

    async def load_state(self, state: Mapping[str, Any]) -> None:
        """
        Load the state of the agent.
        """
        # Create message factory for deserialization.
        message_factory = MessageFactory()
        for msg_data in state["chat_history"]:
            msg = message_factory.create(msg_data)
            assert isinstance(msg, BaseChatMessage)
            self._chat_history.append(msg)
